Skip to main content

Kafka Consumer Groups

What are Kafka Consumer Groups?

A Kafka Consumer Group is a set of consumers that work together to consume messages from one or more topics. Consumer groups provide load balancing and fault tolerance by distributing partitions among consumers in the group. Each partition is consumed by exactly one consumer within a group, ensuring that messages are processed in parallel while maintaining ordering within each partition.

Key Characteristics of Consumer Groups

1. Load Balancing

  • Partitions are distributed among consumers in the group
  • Each partition is consumed by only one consumer in the group
  • Automatic rebalancing when consumers join or leave

2. Fault Tolerance

  • If a consumer fails, its partitions are reassigned to other consumers
  • No message loss during consumer failures
  • Automatic recovery and rebalancing

3. Scalability

  • Add more consumers to increase processing capacity
  • Remove consumers to reduce resource usage
  • Dynamic scaling based on load

Consumer Group Architecture

1. Group Coordinator

// Each consumer group has a coordinator broker
public class GroupCoordinator {
// Manages consumer group membership
// Handles partition assignment
// Tracks consumer heartbeats
// Manages offset commits
}

2. Partition Assignment

// Example partition assignment
Topic: my-topic (6 partitions)
Consumer Group: my-group (3 consumers)

Consumer-1: partitions [0, 1]
Consumer-2: partitions [2, 3]
Consumer-3: partitions [4, 5]

3. Rebalancing Process

// When a consumer joins or leaves the group
public class RebalancingProcess {
public void handleRebalancing() {
// 1. Stop all consumers in the group
// 2. Revoke current partition assignments
// 3. Reassign partitions to remaining consumers
// 4. Resume consumption with new assignments
}
}

Consumer Group Configuration

1. Essential Configuration Properties

// Consumer group configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");

// Group behavior settings
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");

2. Key Configuration Parameters

ParameterDescriptionDefaultRecommended
group.idConsumer group identifier-Required for group functionality
session.timeout.msSession timeout1000030000-60000
heartbeat.interval.msHeartbeat interval300010000
max.poll.interval.msMax time between polls300000300000-600000
auto.offset.resetOffset reset policylatestearliest for new groups
enable.auto.commitAuto-commit offsetstruefalse for manual control

Consumer Group Implementation

1. Basic Consumer Group

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerGroupExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumer: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
Thread.currentThread().getName(),
record.partition(),
record.offset(),
record.key(),
record.value());

processMessage(record);
}

// Manual commit after processing
consumer.commitSync();
}
} finally {
consumer.close();
}
}

private static void processMessage(ConsumerRecord<String, String> record) {
// Implement your message processing logic here
System.out.println("Processing: " + record.value());
}
}

2. Multi-Consumer Group

public class MultiConsumerGroup {
private final String groupId;
private final String topic;
private final int numConsumers;
private final List<KafkaConsumer<String, String>> consumers;
private final ExecutorService executor;

public MultiConsumerGroup(String groupId, String topic, int numConsumers) {
this.groupId = groupId;
this.topic = topic;
this.numConsumers = numConsumers;
this.consumers = new ArrayList<>();
this.executor = Executors.newFixedThreadPool(numConsumers);
}

public void start() {
for (int i = 0; i < numConsumers; i++) {
KafkaConsumer<String, String> consumer = createConsumer();
consumers.add(consumer);

executor.submit(() -> runConsumer(consumer, "Consumer-" + i));
}
}

private KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

return new KafkaConsumer<>(props);
}

private void runConsumer(KafkaConsumer<String, String> consumer, String consumerName) {
consumer.subscribe(Collections.singletonList(topic));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s: Partition %d, Offset %d, Key: %s, Value: %s%n",
consumerName,
record.partition(),
record.offset(),
record.key(),
record.value());

processMessage(record);
}

consumer.commitSync();
}
} finally {
consumer.close();
}
}

public void shutdown() {
consumers.forEach(KafkaConsumer::close);
executor.shutdown();
}
}

Consumer Group Management

1. Group Membership

public class GroupMembership {
public void checkGroupMembership() {
// List all consumer groups
List<String> groups = consumer.listConsumerGroups().valid()
.stream()
.map(ConsumerGroupListing::groupId)
.collect(Collectors.toList());

// Describe specific group
for (String groupId : groups) {
ConsumerGroupDescription description = consumer.describeConsumerGroups(
Collections.singletonList(groupId)).describedGroups().get(groupId);

System.out.println("Group: " + groupId);
System.out.println("State: " + description.state());
System.out.println("Members: " + description.members().size());

for (MemberDescription member : description.members()) {
System.out.println(" Member: " + member.consumerId());
System.out.println(" Client ID: " + member.clientId());
System.out.println(" Host: " + member.host());
System.out.println(" Assigned Partitions: " + member.assignment().topicPartitions());
}
}
}
}

2. Partition Assignment Strategies

// Range Assignor (Default)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RangeAssignor");

// Round Robin Assignor
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");

// Sticky Assignor
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");

// Cooperative Sticky Assignor
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

3. Custom Assignment Strategy

public class CustomAssignor implements PartitionAssignor {
@Override
public String name() {
return "custom";
}

@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignment = new HashMap<>();

// Custom assignment logic
List<String> consumers = new ArrayList<>(subscriptions.keySet());
int consumerIndex = 0;

for (Map.Entry<String, Integer> entry : partitionsPerTopic.entrySet()) {
String topic = entry.getKey();
int numPartitions = entry.getValue();

for (int partition = 0; partition < numPartitions; partition++) {
String consumer = consumers.get(consumerIndex % consumers.size());
TopicPartition topicPartition = new TopicPartition(topic, partition);

assignment.computeIfAbsent(consumer, k -> new ArrayList<>())
.add(topicPartition);

consumerIndex++;
}
}

return assignment;
}

@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
// Handle assignment
}

@Override
public GroupAssignment assign(GroupMetadata metadata, GroupSubscription subscriptions) {
// Group assignment logic
return null;
}
}

Consumer Group Monitoring

1. Consumer Lag Monitoring

public class ConsumerLagMonitor {
public Map<String, Long> getConsumerLag(String groupId, String topic) {
Map<String, Long> lag = new HashMap<>();

// Get consumer group description
ConsumerGroupDescription description = consumer.describeConsumerGroups(
Collections.singletonList(groupId)).describedGroups().get(groupId);

for (MemberDescription member : description.members()) {
Set<TopicPartition> assignedPartitions = member.assignment().topicPartitions();

for (TopicPartition partition : assignedPartitions) {
if (partition.topic().equals(topic)) {
// Get current offset
long currentOffset = consumer.position(Collections.singletonList(partition))
.get(partition);

// Get end offset
long endOffset = consumer.endOffsets(Collections.singletonList(partition))
.get(partition);

lag.put(partition.toString(), endOffset - currentOffset);
}
}
}

return lag;
}

public void monitorLag(String groupId, String topic, long threshold) {
Map<String, Long> lag = getConsumerLag(groupId, topic);

for (Map.Entry<String, Long> entry : lag.entrySet()) {
if (entry.getValue() > threshold) {
System.err.println("High lag detected for " + entry.getKey() +
": " + entry.getValue() + " messages");
}
}
}
}

2. Group Health Monitoring

public class GroupHealthMonitor {
public boolean isGroupHealthy(String groupId) {
try {
ConsumerGroupDescription description = consumer.describeConsumerGroups(
Collections.singletonList(groupId)).describedGroups().get(groupId);

// Check group state
if (description.state() != ConsumerGroupState.STABLE) {
return false;
}

// Check if all members are active
for (MemberDescription member : description.members()) {
if (member.assignment().topicPartitions().isEmpty()) {
return false; // Member has no partitions assigned
}
}

return true;
} catch (Exception e) {
return false;
}
}

public void monitorGroupHealth(String groupId) {
if (!isGroupHealthy(groupId)) {
System.err.println("Consumer group " + groupId + " is unhealthy");

// Get detailed information
ConsumerGroupDescription description = consumer.describeConsumerGroups(
Collections.singletonList(groupId)).describedGroups().get(groupId);

System.err.println("Group state: " + description.state());
System.err.println("Number of members: " + description.members().size());
}
}
}

Consumer Group Patterns

1. Single Consumer Group

// Simple single consumer group
public class SingleConsumerGroup {
public void run() {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "single-group");
// ... other configuration

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record);
}
consumer.commitSync();
}
}
}

2. Multiple Consumer Groups

// Different consumer groups for different purposes
public class MultipleConsumerGroups {
public void run() {
// Real-time processing group
startConsumerGroup("realtime-processors", "user-events");

// Analytics processing group
startConsumerGroup("analytics-processors", "user-events");

// Audit logging group
startConsumerGroup("audit-loggers", "user-events");
}

private void startConsumerGroup(String groupId, String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// ... other configuration

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));

// Start processing in separate thread
new Thread(() -> {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessageForGroup(groupId, record);
}
consumer.commitSync();
}
}).start();
}
}

3. Consumer Group with Dead Letter Queue

public class ConsumerGroupWithDLQ {
private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> dlqProducer;

public void processWithDLQ() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
// Commit offset on success

} catch (Exception e) {
// Send to Dead Letter Queue
ProducerRecord<String, String> dlqRecord =
new ProducerRecord<>("dlq-topic", record.key(), record.value());
dlqProducer.send(dlqRecord);

// Log error but don't commit offset
System.err.println("Message sent to DLQ: " + record.value());
}
}
}
}
}

Consumer Group Administration

1. Command Line Tools

# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Describe consumer group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --describe

# Reset consumer group offsets
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --topic my-topic --reset-offsets --to-earliest

# Delete consumer group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --delete

2. Offset Management

public class OffsetManager {
public void resetOffsets(String groupId, String topic, long offset) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

List<PartitionInfo> partitions = consumer.partitionsFor(topic);
for (PartitionInfo partition : partitions) {
TopicPartition topicPartition = new TopicPartition(topic, partition.partition());
offsets.put(topicPartition, new OffsetAndMetadata(offset));
}

consumer.commitSync(offsets);
}

public void seekToTimestamp(String groupId, String topic, long timestamp) {
List<PartitionInfo> partitions = consumer.partitionsFor(topic);

for (PartitionInfo partition : partitions) {
TopicPartition topicPartition = new TopicPartition(topic, partition.partition());

Map<TopicPartition, Long> timestamps = Collections.singletonMap(topicPartition, timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);

OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
if (offsetAndTimestamp != null) {
consumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}
}
}

Best Practices for Consumer Groups

1. Group Naming

// Use descriptive group names
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-events-processor-v1");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-data-processor");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "audit-log-processor");

// Include environment in group name
props.put(ConsumerConfig.GROUP_ID_CONFIG, "prod-user-events-processor");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "staging-user-events-processor");

2. Configuration Optimization

// Optimize for your use case
Properties props = new Properties();

// For high-throughput processing
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);

// For low-latency processing
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 100);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

3. Error Handling

public class RobustConsumerGroup {
public void runWithErrorHandling() {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
} catch (ProcessingException e) {
// Handle processing errors
handleProcessingError(record, e);
} catch (Exception e) {
// Handle unexpected errors
handleUnexpectedError(record, e);
}
}

consumer.commitSync();

} catch (WakeupException e) {
// Handle consumer shutdown
break;
} catch (Exception e) {
// Handle poll errors
handlePollError(e);
}
}
}
}

Best Practices Summary

  1. Use descriptive group names that indicate the purpose and version
  2. Configure appropriate session timeouts based on processing requirements
  3. Monitor consumer lag to ensure timely processing
  4. Implement proper error handling with retry logic and dead letter queues
  5. Use manual offset management for critical applications
  6. Monitor group health and member status
  7. Plan for group scaling by adding/removing consumers as needed
  8. Test group behavior under various failure scenarios
  9. Document group purposes and configurations for team knowledge
  10. Use appropriate partition assignment strategies based on your use case

Kafka Consumer Groups are essential for building scalable and fault-tolerant data processing applications. Understanding group behavior, management, and best practices is crucial for building reliable streaming applications.